package com.huan.flink;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * flink 数据写入到 mysql 数据库中
 * <a href="https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/jdbc/">jdbc connector</a>
 *
 * @author huan.fu
 * @date 2023/9/23 - 15:05
 */
public class MysqlSinkApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.fromElements(
                        new Student(101, "张三"),
                        new Student(102, "李四"),
                        new Student(103, "王武")
                )
                .addSink(
                        JdbcSink.sink(
                                "insert into student(student_id,student_name) values (?,?)",
                                (JdbcStatementBuilder<Student>) (ps, student) -> {
                                    ps.setInt(1, student.getStudentId());
                                    ps.setString(2, student.getStudentName());
                                },
                                JdbcExecutionOptions.builder()
                                        // 如果达到了100条记录，则执行插入，和下方的batchIntervalMs满足一个条件即可
                                        .withBatchSize(100)
                                        // 如果达到了3s，则执行插入，和上方的batchSize满足一个条件即可
                                        .withBatchIntervalMs(3000)
                                        // 重试3次
                                        .withMaxRetries(3)
                                        .build(),
                                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                        .withUrl("jdbc:mysql://localhost:3306/temp_work?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                                        .withUsername("root")
                                        .withPassword("root@1993")
                                        .withConnectionCheckTimeoutSeconds(60)
                                        .build()
                        )
                );
        environment.execute("sink to mysql job");
        System.out.println("over");
    }
}
